using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using HslCommunication.BasicFramework;
using HslCommunication.Core.Net;

namespace HslCommunication.MQTT
{
	/// <summary>
	/// 一个Mqtt的服务器类对象，本服务器支持发布订阅操作，支持服务器从强制推送数据，指定客户端推送，支持同步网络访问的操作。从而定制化出自己的服务器，详细的使用说明可以参见代码api文档示例<br />
	/// An Mqtt server class object, this server supports publish and subscribe operations, supports the server to push data from the forced, specified client push, 
	/// and supports the operation of synchronous network access. In order to customize your own server, detailed instructions can be found in the code api documentation example
	/// </summary>
	/// <remarks>
	/// 本MQTT服务器功能丰富，可以同时实现，用户名密码验证，在线客户端的管理，数据订阅推送，单纯的数据收发，心跳检测，同步数据访问，详细参照下面的示例说明
	/// </remarks>
	/// <example>
	/// 最简单的使用，就是实例化，启动服务即可
	/// <code lang="cs" source="HslCommunication_Net45.Test\Documentation\Samples\MQTT\MqttServerSample.cs" region="Sample1" title="简单的实例化" />
	/// 当然了，我们可以稍微的复杂一点，加一个功能，验证连接的客户端操作
	/// <code lang="cs" source="HslCommunication_Net45.Test\Documentation\Samples\MQTT\MqttServerSample.cs" region="Sample2" title="增加验证" />
	/// 你也可以对clientid进行过滤验证，只要结果返回不是0，就可以了。接下来我们实现一个功能，所有客户端的发布的消息在控制台打印出来,
	/// <code lang="cs" source="HslCommunication_Net45.Test\Documentation\Samples\MQTT\MqttServerSample.cs" region="Sample3" title="打印所有发布" />
	/// 捕获客户端刚刚上线的时候，方便我们进行一些额外的操作信息。下面的意思就是返回一个数据，将数据发送到指定的会话内容上去
	/// <code lang="cs" source="HslCommunication_Net45.Test\Documentation\Samples\MQTT\MqttServerSample.cs" region="Sample4" title="客户端上线信息" />
	/// 下面演示如何从服务器端发布数据信息，包括多种发布的方法，消息是否驻留，详细看说明即可
	/// <code lang="cs" source="HslCommunication_Net45.Test\Documentation\Samples\MQTT\MqttServerSample.cs" region="Sample5" title="服务器发布" />
	/// 下面演示如何支持同步网络访问，当客户端是同步网络访问时，协议内容会变成HUSL，即被视为同步客户端，进行相关的操作。
	/// <code lang="cs" source="HslCommunication_Net45.Test\Documentation\Samples\MQTT\MqttServerSample.cs" region="Sample6" title="同步访问支持" />
	/// 如果需要查看在线信息，可以随时获取<see cref="P:HslCommunication.MQTT.MqttServer.OnlineCount" />属性，如果需要查看报文信息，可以实例化日志，参考日志的说明即可。
	/// </example>
	public class MqttServer : NetworkServerBase
	{
		/// <summary>
		/// Mqtt的消息收到委托
		/// </summary>
		/// <param name="session">当前会话的内容</param>
		/// <param name="message">Mqtt的消息</param>
		public delegate void OnClientApplicationMessageReceiveDelegate(MqttSession session, MqttClientApplicationMessage message);

		/// <summary>
		/// 当前mqtt客户端连接上服务器的事件委托
		/// </summary>
		/// <param name="session">当前的会话对象</param>
		public delegate void OnClientConnectedDelegate(MqttSession session);

		/// <summary>
		/// 验证的委托
		/// </summary>
		/// <param name="clientId">客户端的id</param>
		/// <param name="userName">用户名</param>
		/// <param name="passwrod">密码</param>
		/// <returns>0则是通过，否则，就是连接失败</returns>
		public delegate int ClientVerificationDelegate(string clientId, string userName, string passwrod);

		private readonly Dictionary<string, byte[]> retainKeys;

		private readonly object keysLock;

		private readonly List<MqttSession> mqttSessions = new List<MqttSession>();

		private readonly object sessionsLock = new object();

		private Timer timerHeart;

		/// <summary>
		/// 获取当前的在线的客户端数量<br />
		/// Gets the number of clients currently online
		/// </summary>
		public int OnlineCount => mqttSessions.Count;

		/// <summary>
		/// 获得当前所有的在线的MQTT客户端信息，包括异步的客户端及同步请求的客户端。<br />
		/// Obtain all current online MQTT client information, including asynchronous client and synchronous request client.
		/// </summary>
		public MqttSession[] OnlineSessions
		{
			get
			{
				MqttSession[] result = null;
				lock (sessionsLock)
				{
					result = mqttSessions.ToArray();
				}
				return result;
			}
		}

		/// <summary>
		/// 获得当前异步客户端在线的MQTT客户端信息。<br />
		/// Get the MQTT client information of the current asynchronous client online.
		/// </summary>
		public MqttSession[] MqttOnlineSessions
		{
			get
			{
				MqttSession[] result = null;
				lock (sessionsLock)
				{
					result = mqttSessions.Where((MqttSession m) => m.Protocol == "MQTT").ToArray();
				}
				return result;
			}
		}

		/// <summary>
		/// 获得当前同步客户端在线的MQTT客户端信息，如果客户端是短连接，将难以捕获在在线信息。<br />
		/// Obtain the MQTT client information of the current synchronization client online. If the client is a short connection, it will be difficult to capture the online information. <br />
		/// </summary>
		public MqttSession[] SyncOnlineSessions
		{
			get
			{
				MqttSession[] result = null;
				lock (sessionsLock)
				{
					result = mqttSessions.Where((MqttSession m) => m.Protocol == "HUSL").ToArray();
				}
				return result;
			}
		}

		/// <summary>
		/// 当收到客户端发来的<see cref="T:HslCommunication.MQTT.MqttClientApplicationMessage" />消息时触发<br />
		/// Triggered when a <see cref="T:HslCommunication.MQTT.MqttClientApplicationMessage" /> message is received from the client
		///             </summary>
		public event OnClientApplicationMessageReceiveDelegate OnClientApplicationMessageReceive;

		/// <summary>
		/// Mqtt的客户端连接上来时触发<br />
		/// Triggered when Mqtt client connects
		/// </summary>
		public event OnClientConnectedDelegate OnClientConnected;

		/// <summary>
		/// Mqtt的客户端下线时触发<br />
		/// Triggered when Mqtt client connects
		/// </summary>
		public event OnClientConnectedDelegate OnClientDisConnected;

		/// <summary>
		/// 当客户端连接时，触发的验证事件<br />
		/// Validation event triggered when the client connects
		/// </summary>
		public event ClientVerificationDelegate ClientVerification;

		/// <summary>
		/// 实例化一个MQTT协议的服务器<br />
		/// Instantiate a MQTT protocol server
		/// </summary>
		public MqttServer()
		{
			retainKeys = new Dictionary<string, byte[]>();
			keysLock = new object();
			timerHeart = new Timer(ThreadTimerHeartCheck, null, 2000, 10000);
		}

		/// <inheritdoc />
		protected override async void ThreadPoolLogin(Socket socket, IPEndPoint endPoint)
		{
			OperateResult<byte, byte[]> readMqtt = await ReceiveMqttMessageAsync(socket);
			if (!readMqtt.IsSuccess)
			{
				return;
			}
			OperateResult<int, MqttSession> check = CheckMqttConnection(readMqtt.Content1, readMqtt.Content2, socket, endPoint);
			if (!check.IsSuccess)
			{
				base.LogNet?.WriteInfo(ToString(), check.Message);
				socket?.Close();
				return;
			}
			if (check.Content1 != 0)
			{
				await SendAsync(socket, MqttHelper.BuildMqttCommand(2, 0, null, new byte[2]
				{
					0,
					(byte)check.Content1
				}).Content);
				socket?.Close();
				return;
			}
			await SendAsync(socket, MqttHelper.BuildMqttCommand(2, 0, null, new byte[2]).Content);
			try
			{
				socket.BeginReceive(new byte[0], 0, 0, SocketFlags.None, SocketReceiveCallback, check.Content2);
				AddMqttSession(check.Content2);
			}
			catch (Exception ex2)
			{
				Exception ex = ex2;
				base.LogNet?.WriteDebug(ToString(), "Client Online Exception : " + ex.Message);
				return;
			}
			if (check.Content2.Protocol == "MQTT")
			{
				this.OnClientConnected?.Invoke(check.Content2);
			}
		}

		private async void SocketReceiveCallback(IAsyncResult ar)
		{
			object asyncState = ar.AsyncState;
			MqttSession mqttSession = asyncState as MqttSession;
			if (mqttSession == null)
			{
				return;
			}
			try
			{
				mqttSession.MqttSocket.EndReceive(ar);
			}
			catch (Exception ex2)
			{
				Exception ex = ex2;
				base.LogNet?.WriteDebug(ToString(), "ReceiveCallback Failed:" + ex.Message);
				RemoveAndCloseSession(mqttSession);
				return;
			}
			OperateResult<byte, byte[]> readMqtt = (!(mqttSession.Protocol == "MQTT")) ? (await ReceiveMqttMessageAsync(mqttSession.MqttSocket, delegate(long already, long total)
			{
				string message = (total > 0) ? (already * 100 / total).ToString() : "100";
				byte[] array = new byte[16];
				BitConverter.GetBytes(already).CopyTo(array, 0);
				BitConverter.GetBytes(total).CopyTo(array, 8);
				Send(mqttSession.MqttSocket, MqttHelper.BuildMqttCommand(15, 0, MqttHelper.BuildSegCommandByString(message), array).Content);
			})) : (await ReceiveMqttMessageAsync(mqttSession.MqttSocket));
			if (!readMqtt.IsSuccess)
			{
				RemoveAndCloseSession(mqttSession);
				return;
			}
			byte code = readMqtt.Content1;
			byte[] data = readMqtt.Content2;
			try
			{
				if (code >> 4 == 14)
				{
					RemoveAndCloseSession(mqttSession);
					return;
				}
				mqttSession.MqttSocket.BeginReceive(new byte[0], 0, 0, SocketFlags.None, SocketReceiveCallback, mqttSession);
			}
			catch
			{
				RemoveAndCloseSession(mqttSession);
				return;
			}
			if (code >> 4 == 3)
			{
				mqttSession.ActiveTime = DateTime.Now;
				DealWithPublish(mqttSession, code, data);
			}
			else if (code >> 4 == 6)
			{
				await SendAsync(mqttSession.MqttSocket, MqttHelper.BuildMqttCommand(7, 0, null, data).Content);
			}
			else if (code >> 4 == 8)
			{
				mqttSession.ActiveTime = DateTime.Now;
				DealWithSubscribe(mqttSession, code, data);
			}
			else if (code >> 4 == 10)
			{
				mqttSession.ActiveTime = DateTime.Now;
				DealWithUnSubscribe(mqttSession, code, data);
			}
			else if (code >> 4 == 12)
			{
				mqttSession.ActiveTime = DateTime.Now;
				await SendAsync(mqttSession.MqttSocket, MqttHelper.BuildMqttCommand(13, 0, null, null).Content);
			}
		}

		private OperateResult<int, MqttSession> CheckMqttConnection(byte mqttCode, byte[] content, Socket socket, IPEndPoint endPoint)
		{
			if (mqttCode >> 4 != 1)
			{
				return new OperateResult<int, MqttSession>("Client Send Faied, And Close!");
			}
			if (content.Length < 10)
			{
				return new OperateResult<int, MqttSession>("Receive Data Too Short:" + SoftBasic.ByteToHexString(content, ' '));
			}
			string @string = Encoding.ASCII.GetString(content, 2, 4);
			if (@string != "MQTT" && @string != "HUSL")
			{
				return new OperateResult<int, MqttSession>("Not Mqtt Client Connection");
			}
			try
			{
				int index = 10;
				string clientId = MqttHelper.ExtraMsgFromBytes(content, ref index);
				string text = ((content[7] & 4) == 4) ? MqttHelper.ExtraMsgFromBytes(content, ref index) : string.Empty;
				string text2 = ((content[7] & 4) == 4) ? MqttHelper.ExtraMsgFromBytes(content, ref index) : string.Empty;
				string userName = ((content[7] & 0x80) == 128) ? MqttHelper.ExtraMsgFromBytes(content, ref index) : string.Empty;
				string passwrod = ((content[7] & 0x40) == 64) ? MqttHelper.ExtraMsgFromBytes(content, ref index) : string.Empty;
				int num = content[8] * 256 + content[9];
				int value = (this.ClientVerification != null) ? this.ClientVerification(clientId, userName, passwrod) : 0;
				MqttSession mqttSession = new MqttSession(endPoint, @string)
				{
					MqttSocket = socket,
					ClientId = clientId,
					UserName = userName
				};
				if (num > 0)
				{
					mqttSession.ActiveTimeSpan = TimeSpan.FromSeconds(num);
				}
				return OperateResult.CreateSuccessResult(value, mqttSession);
			}
			catch (Exception ex)
			{
				return new OperateResult<int, MqttSession>("Client Online Exception : " + ex.Message);
			}
		}

		/// <inheritdoc />
		protected override void StartInitialization()
		{
		}

		/// <inheritdoc />
		protected override void CloseAction()
		{
			base.CloseAction();
			lock (sessionsLock)
			{
				for (int i = 0; i < mqttSessions.Count; i++)
				{
					mqttSessions[i].MqttSocket?.Close();
				}
				mqttSessions.Clear();
			}
		}

		private void ThreadTimerHeartCheck(object obj)
		{
			MqttSession[] array = null;
			lock (sessionsLock)
			{
				array = mqttSessions.ToArray();
			}
			if (array == null || array.Length == 0)
			{
				return;
			}
			for (int i = 0; i < array.Length; i++)
			{
				if (array[i].Protocol == "MQTT" && DateTime.Now - array[i].ActiveTime > array[i].ActiveTimeSpan)
				{
					RemoveAndCloseSession(array[i]);
				}
			}
		}

		private void DealWithPublish(MqttSession session, byte code, byte[] data)
		{
			bool flag = (code & 8) == 8;
			int num = (((code & 4) == 4) ? 2 : 0) + (((code & 2) == 2) ? 1 : 0);
			MqttQualityOfServiceLevel mqttQualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
			switch (num)
			{
			case 1:
				mqttQualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce;
				break;
			case 2:
				mqttQualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce;
				break;
			case 3:
				mqttQualityOfServiceLevel = MqttQualityOfServiceLevel.OnlyTransfer;
				break;
			}
			bool flag2 = (code & 1) == 1;
			int data2 = 0;
			int index = 0;
			string topic = MqttHelper.ExtraMsgFromBytes(data, ref index);
			if (num > 0)
			{
				data2 = MqttHelper.ExtraIntFromBytes(data, ref index);
			}
			byte[] payload = SoftBasic.ArrayRemoveBegin(data, index);
			switch (mqttQualityOfServiceLevel)
			{
			case MqttQualityOfServiceLevel.AtLeastOnce:
				Send(session.MqttSocket, MqttHelper.BuildMqttCommand(4, 0, null, MqttHelper.BuildIntBytes(data2)).Content);
				break;
			case MqttQualityOfServiceLevel.ExactlyOnce:
				Send(session.MqttSocket, MqttHelper.BuildMqttCommand(5, 0, null, MqttHelper.BuildIntBytes(data2)).Content);
				break;
			}
			MqttClientApplicationMessage mqttClientApplicationMessage = new MqttClientApplicationMessage
			{
				ClientId = session.ClientId,
				QualityOfServiceLevel = mqttQualityOfServiceLevel,
				Retain = flag2,
				Topic = topic,
				UserName = session.UserName,
				Payload = payload
			};
			this.OnClientApplicationMessageReceive?.Invoke(session, mqttClientApplicationMessage);
			if (flag2)
			{
				RetainTopicPayload(topic, payload);
			}
			if (mqttQualityOfServiceLevel != MqttQualityOfServiceLevel.OnlyTransfer && !mqttClientApplicationMessage.IsCancelPublish)
			{
				PublishTopicPayload(topic, payload);
			}
		}

		/// <summary>
		/// 将消息进行驻留
		/// </summary>
		/// <param name="topic">消息的主题</param>
		/// <param name="payload">当前的数据负载</param>
		private void RetainTopicPayload(string topic, byte[] payload)
		{
			lock (keysLock)
			{
				if (retainKeys.ContainsKey(topic))
				{
					retainKeys[topic] = payload;
				}
				else
				{
					retainKeys.Add(topic, payload);
				}
			}
		}

		private void DealWithSubscribe(MqttSession session, byte code, byte[] data)
		{
			int num = 0;
			int index = 0;
			num = MqttHelper.ExtraIntFromBytes(data, ref index);
			List<string> list = new List<string>();
			while (index < data.Length - 1)
			{
				list.Add(MqttHelper.ExtraMsgFromBytes(data, ref index));
			}
			if (index < data.Length)
			{
				Send(session.MqttSocket, MqttHelper.BuildMqttCommand(9, 0, MqttHelper.BuildIntBytes(num), new byte[1]
				{
					data[index]
				}).Content);
			}
			else
			{
				Send(session.MqttSocket, MqttHelper.BuildMqttCommand(9, 0, null, MqttHelper.BuildIntBytes(num)).Content);
			}
			lock (keysLock)
			{
				for (int i = 0; i < list.Count; i++)
				{
					if (retainKeys.ContainsKey(list[i]))
					{
						Send(session.MqttSocket, MqttHelper.BuildPublishMqttCommand(list[i], retainKeys[list[i]]).Content);
					}
				}
			}
			session.AddSubscribe(list.ToArray());
		}

		private void DealWithUnSubscribe(MqttSession session, byte code, byte[] data)
		{
			int num = 0;
			int i = 0;
			num = MqttHelper.ExtraIntFromBytes(data, ref i);
			List<string> list = new List<string>();
			for (; i < data.Length; i++)
			{
				list.Add(MqttHelper.ExtraMsgFromBytes(data, ref i));
			}
			Send(session.MqttSocket, MqttHelper.BuildMqttCommand(11, 0, null, MqttHelper.BuildIntBytes(num)).Content);
			session.RemoveSubscribe(list.ToArray());
		}

		/// <summary>
		/// 向指定的客户端发送主题及负载数据<br />
		/// Sends the topic and payload data to the specified client
		/// </summary>
		/// <param name="session">会话内容</param>
		/// <param name="topic">主题</param>
		/// <param name="payload">消息内容</param>
		public void PublishTopicPayload(MqttSession session, string topic, byte[] payload)
		{
			OperateResult operateResult = Send(session.MqttSocket, MqttHelper.BuildPublishMqttCommand(topic, payload).Content);
			if (!operateResult.IsSuccess)
			{
				base.LogNet?.WriteError(ToString(), "Send Topic Failed,Client Id:" + session.ClientId);
			}
		}

		/// <summary>
		/// 从服务器向订阅了指定的主题的客户端发送消息，默认消息不驻留<br />
		/// Sends a message from the server to a client that subscribes to the specified topic; the default message does not retain
		/// </summary>
		/// <param name="topic">主题</param>
		/// <param name="payload">消息内容</param>
		/// <param name="retain">指示消息是否驻留</param>
		public void PublishTopicPayload(string topic, byte[] payload, bool retain = false)
		{
			lock (sessionsLock)
			{
				for (int i = 0; i < mqttSessions.Count; i++)
				{
					if (mqttSessions[i].IsClientSubscribe(topic) && mqttSessions[i].Protocol == "MQTT")
					{
						OperateResult operateResult = Send(mqttSessions[i].MqttSocket, MqttHelper.BuildPublishMqttCommand(topic, payload).Content);
						if (!operateResult.IsSuccess)
						{
							base.LogNet?.WriteError(ToString(), "Send Topic Failed,Client Id:" + mqttSessions[i].ClientId);
						}
					}
				}
			}
			if (retain)
			{
				RetainTopicPayload(topic, payload);
			}
		}

		/// <summary>
		/// 向所有的客户端强制发送主题及负载数据，默认消息不驻留<br />
		/// Send subject and payload data to all clients compulsively, and the default message does not retain
		/// </summary>
		/// <param name="topic">主题</param>
		/// <param name="payload">消息内容</param>
		/// <param name="retain">指示消息是否驻留</param>
		public void PublishAllClientTopicPayload(string topic, byte[] payload, bool retain = false)
		{
			lock (sessionsLock)
			{
				for (int i = 0; i < mqttSessions.Count; i++)
				{
					if (mqttSessions[i].Protocol == "MQTT")
					{
						OperateResult operateResult = Send(mqttSessions[i].MqttSocket, MqttHelper.BuildPublishMqttCommand(topic, payload).Content);
						if (!operateResult.IsSuccess)
						{
							base.LogNet?.WriteError(ToString(), "Send Topic Failed,Client Id:" + mqttSessions[i].ClientId);
						}
					}
				}
			}
			if (retain)
			{
				RetainTopicPayload(topic, payload);
			}
		}

		/// <summary>
		/// 向指定的客户端ID强制发送消息，默认消息不驻留<br />
		/// Forces a message to the specified client ID, and the default message does not retain
		/// </summary>
		/// <param name="clientId">指定的客户端ID信息</param>
		/// <param name="topic">主题</param>
		/// <param name="payload">消息内容</param>
		/// <param name="retain">指示消息是否驻留</param>
		public void PublishTopicPayload(string clientId, string topic, byte[] payload, bool retain = false)
		{
			lock (sessionsLock)
			{
				for (int i = 0; i < mqttSessions.Count; i++)
				{
					if (mqttSessions[i].ClientId == clientId && mqttSessions[i].Protocol == "MQTT")
					{
						OperateResult operateResult = Send(mqttSessions[i].MqttSocket, MqttHelper.BuildPublishMqttCommand(topic, payload).Content);
						if (!operateResult.IsSuccess)
						{
							base.LogNet?.WriteError(ToString(), "Send Topic Failed,Client Id:" + mqttSessions[i].ClientId);
						}
					}
				}
			}
			if (retain)
			{
				RetainTopicPayload(topic, payload);
			}
		}

		/// <summary>
		/// 向客户端发布一个进度报告的信息，仅用于同步网络的时候才支持进度报告，将进度及消息发送给客户端，比如你的服务器需要分成5个部分完成，可以按照百分比提示给客户端当前服务器发生了什么<br />
		/// Publish the information of a progress report to the client. The progress report is only supported when the network is synchronized. 
		/// The progress and the message are sent to the client. For example, your server needs to be divided into 5 parts to complete. 
		/// You can prompt the client according to the percentage. What happened to the server
		/// </summary>
		/// <param name="session">当前的网络会话</param>
		/// <param name="topic">回发客户端的关键数据，可以是百分比字符串，甚至是自定义的任意功能</param>
		/// <param name="payload">数据消息</param>
		public void ReportProgress(MqttSession session, string topic, string payload)
		{
			if (session.Protocol == "HUSL")
			{
				payload = (payload ?? string.Empty);
				OperateResult operateResult = Send(session.MqttSocket, MqttHelper.BuildMqttCommand(15, 0, MqttHelper.BuildSegCommandByString(topic), Encoding.UTF8.GetBytes(payload)).Content);
				if (!operateResult.IsSuccess)
				{
					base.LogNet?.WriteError(ToString(), "Send Progress Topic Failed,Client Id:" + session.ClientId);
				}
				return;
			}
			throw new Exception("ReportProgress only support sync communication");
		}

		private void AddMqttSession(MqttSession session)
		{
			lock (sessionsLock)
			{
				mqttSessions.Add(session);
			}
			base.LogNet?.WriteDebug(ToString(), "Client[" + session.ClientId + "] Name:" + session.UserName + " Online");
		}

		/// <summary>
		/// 让MQTT客户端正常下线，调用本方法即可自由控制会话客户端强制下线操作。
		/// </summary>
		/// <param name="session">当前的会话信息</param>
		public void RemoveAndCloseSession(MqttSession session)
		{
			lock (sessionsLock)
			{
				mqttSessions.Remove(session);
			}
			session.MqttSocket?.Close();
			base.LogNet?.WriteDebug(ToString(), "Client[" + session.ClientId + "] Name:" + session.UserName + " Offline");
			if (session.Protocol == "MQTT")
			{
				this.OnClientDisConnected?.Invoke(session);
			}
		}

		/// <summary>
		/// 获取所有的驻留的消息的主题，如果消息发布的时候没有使用Retain属性，就无法通过本方法查到
		/// </summary>
		/// <returns>主题的数组</returns>
		public string[] GetAllRetainTopics()
		{
			string[] result = null;
			lock (keysLock)
			{
				result = retainKeys.Select((KeyValuePair<string, byte[]> m) => m.Key).ToArray();
			}
			return result;
		}

		/// <inheritdoc />
		public override string ToString()
		{
			return $"MqttServer[{base.Port}]";
		}
	}
}
